1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.operators;
17
18 import static org.junit.Assert.assertEquals;
19 import static org.mockito.Matchers.isA;
20 import static org.mockito.Mockito.inOrder;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.spy;
23 import static org.mockito.Mockito.times;
24
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.List;
28 import java.util.NoSuchElementException;
29 import java.util.concurrent.atomic.AtomicLong;
30
31 import org.junit.Test;
32 import org.mockito.InOrder;
33
34 import rx.Observable;
35 import rx.Observer;
36 import rx.Subscriber;
37 import rx.functions.Action1;
38 import rx.functions.Func1;
39 import rx.functions.Func2;
40
41 public class OperatorSingleTest {
42
43 @Test
44 public void testSingle() {
45 Observable<Integer> observable = Observable.just(1).single();
46
47 @SuppressWarnings("unchecked")
48 Observer<Integer> observer = mock(Observer.class);
49 observable.subscribe(observer);
50
51 InOrder inOrder = inOrder(observer);
52 inOrder.verify(observer, times(1)).onNext(1);
53 inOrder.verify(observer, times(1)).onCompleted();
54 inOrder.verifyNoMoreInteractions();
55 }
56
57 @Test
58 public void testSingleWithTooManyElements() {
59 Observable<Integer> observable = Observable.just(1, 2).single();
60
61 @SuppressWarnings("unchecked")
62 Observer<Integer> observer = mock(Observer.class);
63 observable.subscribe(observer);
64
65 InOrder inOrder = inOrder(observer);
66 inOrder.verify(observer, times(1)).onError(
67 isA(IllegalArgumentException.class));
68 inOrder.verifyNoMoreInteractions();
69 }
70
71 @Test
72 public void testSingleWithEmpty() {
73 Observable<Integer> observable = Observable.<Integer> empty().single();
74
75 @SuppressWarnings("unchecked")
76 Observer<Integer> observer = mock(Observer.class);
77 observable.subscribe(observer);
78
79 InOrder inOrder = inOrder(observer);
80 inOrder.verify(observer, times(1)).onError(
81 isA(NoSuchElementException.class));
82 inOrder.verifyNoMoreInteractions();
83 }
84
85 @Test
86 public void testSingleDoesNotRequestMoreThanItNeedsToEmitItem() {
87 final AtomicLong request = new AtomicLong();
88 Observable.just(1).doOnRequest(new Action1<Long>() {
89 @Override
90 public void call(Long n) {
91 request.addAndGet(n);
92 }
93 }).toBlocking().single();
94 assertEquals(2, request.get());
95 }
96
97 @Test
98 public void testSingleDoesNotRequestMoreThanItNeedsToEmitErrorFromEmpty() {
99 final AtomicLong request = new AtomicLong();
100 try {
101 Observable.empty().doOnRequest(new Action1<Long>() {
102 @Override
103 public void call(Long n) {
104 request.addAndGet(n);
105 }
106 }).toBlocking().single();
107 } catch (NoSuchElementException e) {
108 assertEquals(2, request.get());
109 }
110 }
111
112 @Test
113 public void testSingleDoesNotRequestMoreThanItNeedsToEmitErrorFromMoreThanOne() {
114 final AtomicLong request = new AtomicLong();
115 try {
116 Observable.just(1, 2).doOnRequest(new Action1<Long>() {
117 @Override
118 public void call(Long n) {
119 request.addAndGet(n);
120 }
121 }).toBlocking().single();
122 } catch (IllegalArgumentException e) {
123 assertEquals(2, request.get());
124 }
125 }
126
127 @Test
128 public void testSingleDoesNotRequestMoreThanItNeedsIf1Then2Requested() {
129 final List<Long> requests = new ArrayList<Long>();
130 Observable.just(1)
131
132 .doOnRequest(new Action1<Long>() {
133 @Override
134 public void call(Long n) {
135 requests.add(n);
136 }
137 })
138
139 .single()
140
141 .subscribe(new Subscriber<Integer>() {
142
143 @Override
144 public void onStart() {
145 request(1);
146 }
147
148 @Override
149 public void onCompleted() {
150
151 }
152
153 @Override
154 public void onError(Throwable e) {
155
156 }
157
158 @Override
159 public void onNext(Integer t) {
160 request(2);
161 }
162 });
163 assertEquals(Arrays.asList(2L), requests);
164 }
165
166 @Test
167 public void testSingleDoesNotRequestMoreThanItNeedsIf3Requested() {
168 final List<Long> requests = new ArrayList<Long>();
169 Observable.just(1)
170
171 .doOnRequest(new Action1<Long>() {
172 @Override
173 public void call(Long n) {
174 requests.add(n);
175 }
176 })
177
178 .single()
179
180 .subscribe(new Subscriber<Integer>() {
181
182 @Override
183 public void onStart() {
184 request(3);
185 }
186
187 @Override
188 public void onCompleted() {
189
190 }
191
192 @Override
193 public void onError(Throwable e) {
194
195 }
196
197 @Override
198 public void onNext(Integer t) {
199 }
200 });
201 assertEquals(Arrays.asList(2L), requests);
202 }
203
204 @Test
205 public void testSingleRequestsExactlyWhatItNeedsIf1Requested() {
206 final List<Long> requests = new ArrayList<Long>();
207 Observable.just(1)
208
209 .doOnRequest(new Action1<Long>() {
210 @Override
211 public void call(Long n) {
212 requests.add(n);
213 }
214 })
215
216 .single()
217
218 .subscribe(new Subscriber<Integer>() {
219
220 @Override
221 public void onStart() {
222 request(1);
223 }
224
225 @Override
226 public void onCompleted() {
227
228 }
229
230 @Override
231 public void onError(Throwable e) {
232
233 }
234
235 @Override
236 public void onNext(Integer t) {
237 }
238 });
239 assertEquals(Arrays.asList(2L), requests);
240 }
241
242
243 @Test
244 public void testSingleWithPredicate() {
245 Observable<Integer> observable = Observable.just(1, 2).single(
246 new Func1<Integer, Boolean>() {
247
248 @Override
249 public Boolean call(Integer t1) {
250 return t1 % 2 == 0;
251 }
252 });
253
254 @SuppressWarnings("unchecked")
255 Observer<Integer> observer = mock(Observer.class);
256 observable.subscribe(observer);
257
258 InOrder inOrder = inOrder(observer);
259 inOrder.verify(observer, times(1)).onNext(2);
260 inOrder.verify(observer, times(1)).onCompleted();
261 inOrder.verifyNoMoreInteractions();
262 }
263
264 @Test
265 public void testSingleWithPredicateAndTooManyElements() {
266 Observable<Integer> observable = Observable.just(1, 2, 3, 4).single(
267 new Func1<Integer, Boolean>() {
268
269 @Override
270 public Boolean call(Integer t1) {
271 return t1 % 2 == 0;
272 }
273 });
274
275 @SuppressWarnings("unchecked")
276 Observer<Integer> observer = mock(Observer.class);
277 observable.subscribe(observer);
278
279 InOrder inOrder = inOrder(observer);
280 inOrder.verify(observer, times(1)).onError(
281 isA(IllegalArgumentException.class));
282 inOrder.verifyNoMoreInteractions();
283 }
284
285 @Test
286 public void testSingleWithPredicateAndEmpty() {
287 Observable<Integer> observable = Observable.just(1).single(
288 new Func1<Integer, Boolean>() {
289
290 @Override
291 public Boolean call(Integer t1) {
292 return t1 % 2 == 0;
293 }
294 });
295 @SuppressWarnings("unchecked")
296 Observer<Integer> observer = mock(Observer.class);
297 observable.subscribe(observer);
298
299 InOrder inOrder = inOrder(observer);
300 inOrder.verify(observer, times(1)).onError(
301 isA(NoSuchElementException.class));
302 inOrder.verifyNoMoreInteractions();
303 }
304
305 @Test
306 public void testSingleOrDefault() {
307 Observable<Integer> observable = Observable.just(1).singleOrDefault(2);
308
309 @SuppressWarnings("unchecked")
310 Observer<Integer> observer = mock(Observer.class);
311 observable.subscribe(observer);
312
313 InOrder inOrder = inOrder(observer);
314 inOrder.verify(observer, times(1)).onNext(1);
315 inOrder.verify(observer, times(1)).onCompleted();
316 inOrder.verifyNoMoreInteractions();
317 }
318
319 @Test
320 public void testSingleOrDefaultWithTooManyElements() {
321 Observable<Integer> observable = Observable.just(1, 2).singleOrDefault(
322 3);
323
324 @SuppressWarnings("unchecked")
325 Observer<Integer> observer = mock(Observer.class);
326 observable.subscribe(observer);
327
328 InOrder inOrder = inOrder(observer);
329 inOrder.verify(observer, times(1)).onError(
330 isA(IllegalArgumentException.class));
331 inOrder.verifyNoMoreInteractions();
332 }
333
334 @Test
335 public void testSingleOrDefaultWithEmpty() {
336 Observable<Integer> observable = Observable.<Integer> empty()
337 .singleOrDefault(1);
338
339 @SuppressWarnings("unchecked")
340 Observer<Integer> observer = mock(Observer.class);
341 observable.subscribe(observer);
342
343 InOrder inOrder = inOrder(observer);
344 inOrder.verify(observer, times(1)).onNext(1);
345 inOrder.verify(observer, times(1)).onCompleted();
346 inOrder.verifyNoMoreInteractions();
347 }
348
349 @Test
350 public void testSingleOrDefaultWithPredicate() {
351 Observable<Integer> observable = Observable.just(1, 2).singleOrDefault(
352 4, new Func1<Integer, Boolean>() {
353
354 @Override
355 public Boolean call(Integer t1) {
356 return t1 % 2 == 0;
357 }
358 });
359
360 @SuppressWarnings("unchecked")
361 Observer<Integer> observer = mock(Observer.class);
362 observable.subscribe(observer);
363
364 InOrder inOrder = inOrder(observer);
365 inOrder.verify(observer, times(1)).onNext(2);
366 inOrder.verify(observer, times(1)).onCompleted();
367 inOrder.verifyNoMoreInteractions();
368 }
369
370 @Test
371 public void testSingleOrDefaultWithPredicateAndTooManyElements() {
372 Observable<Integer> observable = Observable.just(1, 2, 3, 4)
373 .singleOrDefault(6, new Func1<Integer, Boolean>() {
374
375 @Override
376 public Boolean call(Integer t1) {
377 return t1 % 2 == 0;
378 }
379 });
380
381 @SuppressWarnings("unchecked")
382 Observer<Integer> observer = mock(Observer.class);
383 observable.subscribe(observer);
384
385 InOrder inOrder = inOrder(observer);
386 inOrder.verify(observer, times(1)).onError(
387 isA(IllegalArgumentException.class));
388 inOrder.verifyNoMoreInteractions();
389 }
390
391 @Test
392 public void testSingleOrDefaultWithPredicateAndEmpty() {
393 Observable<Integer> observable = Observable.just(1).singleOrDefault(2,
394 new Func1<Integer, Boolean>() {
395
396 @Override
397 public Boolean call(Integer t1) {
398 return t1 % 2 == 0;
399 }
400 });
401
402 @SuppressWarnings("unchecked")
403 Observer<Integer> observer = mock(Observer.class);
404 observable.subscribe(observer);
405
406 InOrder inOrder = inOrder(observer);
407 inOrder.verify(observer, times(1)).onNext(2);
408 inOrder.verify(observer, times(1)).onCompleted();
409 inOrder.verifyNoMoreInteractions();
410 }
411
412 @Test
413 public void testSingleWithBackpressure() {
414 Observable<Integer> observable = Observable.just(1, 2).single();
415
416 Subscriber<Integer> subscriber = spy(new Subscriber<Integer>() {
417
418 @Override
419 public void onStart() {
420 request(1);
421 }
422
423 @Override
424 public void onCompleted() {
425
426 }
427
428 @Override
429 public void onError(Throwable e) {
430
431 }
432
433 @Override
434 public void onNext(Integer integer) {
435 request(1);
436 }
437 });
438 observable.subscribe(subscriber);
439
440 InOrder inOrder = inOrder(subscriber);
441 inOrder.verify(subscriber, times(1)).onError(isA(IllegalArgumentException.class));
442 inOrder.verifyNoMoreInteractions();
443 }
444
445 @Test(timeout = 30000)
446 public void testIssue1527() throws InterruptedException {
447
448 Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6);
449 Observable<Integer> reduced = source.reduce(new Func2<Integer, Integer, Integer>() {
450 @Override
451 public Integer call(Integer i1, Integer i2) {
452 return i1 + i2;
453 }
454 });
455
456 Integer r = reduced.toBlocking().first();
457 assertEquals(21, r.intValue());
458 }
459 }